package com.carol.bigdata.utils

import java.security.MessageDigest

import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}

import scala.collection.JavaConverters._
import scala.collection.mutable


object FuncUtil {

    // 辅助函数,获取key List
    def getKey(x: mutable.Map[String, String], keyColumn: List[String]): List[String] = {
        var key = List[String]()
        for (c <- keyColumn) {
            key = key :+ x.getOrElse(c, "")
        }
        key
    }


    // 对应MAP合并,同时相同的key对应value相加
    // map1: Map(key1->1, key2->2)
    // map2: Map(key1->2, key3->3)
    // mapAdd: Map(key1->3, key2->2, key3->3)
    def mapAddByKey(map1: Map[String, Int], map2: Map[String, Int]): Map[String, Int] = {
        val mapAdd: Map[String, Int] = (map1 /: map2) ((map, kv) => {
            map + (kv._1 -> (kv._2 + map.getOrElse(kv._1, 0)))
        })
        mapAdd
    }

    /**
     * 对应MAP合并,同时相同的key对应 List[value1] ++ List[value2]
     * @param  map1                  Map(key1->List(1), key2->List(2))
     * @param  map2                  Map(key1->List(2), key3->List(3))
     * @return                       Map(key1->List(1,2), key2->List(2), key3->List(3))
     */
    def mapListAddByKey(map1: Map[String, List[Int]], map2: Map[String, List[Int]]): Map[String, List[Int]] = {
        val mapAdd = (map1 /: map2) ((map, kv) => {
            map + (kv._1 -> (kv._2 ++ map.getOrElse(kv._1, List())))
        })
        mapAdd
    }

    // 辅助函数,获取value List
    def getIntValue(x: mutable.Map[String, String],
                    valueColumn: List[String],
                    valueFunc: String => String = null): List[Int] = {
        var value = List[Int]()
        //x.foreach(println)
        for (c <- valueColumn) {
            var tmp: String = x.getOrElse(c, "0")
            if (valueFunc != null && tmp != "")
                tmp = valueFunc(tmp)
            value = value :+ (if (tmp != null) tmp.toInt else 0)
        }
        value
    }

    // 辅助函数,获取value List
    def getMapIntValue(x: mutable.Map[String, String],
                       valueColumn: List[String]): List[Map[String, Int]] = {
        var value = List[Map[String, Int]]()
        //x.foreach(println)
        for (c <- valueColumn) {
            val tmp: String = x.getOrElse(c, "0")
            var res = Map[String, Int]()
            if (tmp != null && tmp != "") {
                res = defaultTagAllFunc(tmp)
            }
            value :+= res
        }
        value
    }

    // 辅助函数,获取value List
    def getStringValue(x: mutable.Map[String, String],
                       valueColumn: List[String],
                       valueFunc: String => String = null): List[String] = {
        var value = List[String]()
        //x.foreach(println)
        for (c <- valueColumn) {
            var tmp: String = x.getOrElse(c, "")
            if (valueFunc != null && tmp != "")
                tmp = valueFunc(tmp)
            value :+= tmp
        }
        value
    }

    // 辅助函数,获取value List
    def getString2MapValue(x: mutable.Map[String, String],
                           valueColumn: List[String]): List[Map[String, Int]] = {
        var value = List[Map[String, Int]]()
        //x.foreach(println)
        for (c <- valueColumn) {
            val tmp: String = x.getOrElse(c, "")
            val v = if (tmp != "") Map(tmp -> 1) else Map[String, Int]()
            value :+= v
        }
        value
    }

    // 辅助函数,获取value List
    def getMapValue(x: mutable.Map[String, String],
                    valueColumn: List[String],
                    valueFunc: String => String = null): List[Map[String, Int]] = {
        var value = List[Map[String, Int]]()
        for (c <- valueColumn) {
            var tmp = x.getOrElse(c, "")
            if (valueFunc != null && tmp != "")
                tmp = valueFunc(tmp)
            if (tmp != "") {
                val item = JSON.parseObject(tmp)
                value :+= jsonObj2MapInt(item)
            } else value :+= Map[String, Int]()
        }
        value
    }

    def getManyValue(x: mutable.Map[String, String],
                     intColumns: List[String],
                     strColumns: List[String],
                     mapColumn: List[String]): (List[Int], List[String], List[Map[String, Int]]) = {
        (getIntValue(x, intColumns), getStringValue(x, strColumns), getMapValue(x, mapColumn))
    }


    // 解析时间为统计时间
    def getStatDay(day: String, timeMode: String): String = {
        // 2020-10-23 11:00:99.789
        val mode = timeMode.toLowerCase
        if (mode == "day")
            day.substring(0, 10)
        else if (mode == "hour")
            day.substring(0, 13)
        else if (mode == "min")
            day.substring(0, 16)
        else if (mode == "year")
            day.substring(0, 4)
        else if (mode == "month")
            day.substring(0, 7)
        else if (mode == "second")
            day.substring(0, 19)
        else
            day.substring(0, 23) // 毫秒级
    }


    // 获取rowkey
    def getRowKey(x: Map[String, String], rowkeyRule: String): String = {
        var rowkeyList = List[String]()
        val rowkeyFields = rowkeyRule.split(",")
        for (i <- rowkeyFields) {
            if (i == "record") {
                // 则需要进行hash
                rowkeyList :+= hash(x.toString())
            } else {
                rowkeyList :+= x.getOrElse(i, "error")
            }
        }
        val rowKey = rowkeyList.mkString("_")
        rowKey
    }

    // 多列簇MapRDD获取rowkey
    def getRowKeyFromCFValuePair(x: Map[String, (String, String)], rowkeyRule: String): String = {
        var rowkeyList = List[String]()
        val rowkeyFields = rowkeyRule.split(",")
        for (i <- rowkeyFields) {
            if (i == "record") {
                // 则需要进行hash
                rowkeyList :+= hash(x.toString())
            } else {
                rowkeyList :+= x.getOrElse(i, ("error", "error"))._2
            }
        }
        val rowKey = rowkeyList.mkString("_")
        rowKey
    }

    // md5 hash算法
    def hash(record: String): String = {
        var md5: MessageDigest = null
        try {
            md5 = MessageDigest.getInstance("MD5")
        } catch {
            case e: Exception => e.printStackTrace()
        }
        //rowKey的组成：id_hash(info)
        val encode = md5.digest(record.getBytes())
        encode.map("%02x".format(_)).mkString
    }


    /*
     * 获取日期和渠道信息
     */
    def getKey(x: Map[String, String], timeField: String, keyFieldsList: List[String]): List[String] = {
        var day = x.getOrElse(timeField, "0000-00-00 00")
        day = day.substring(0, 10)
        val keys: Seq[String] = keyFieldsList.sorted
        var key = List[String](day)
        for (i <- keys)
            key :+= x.getOrElse(i, "")
        key
    }

    /*
     * 获取不同表的渠道字段,封装Key,Value, 将字段加入List
     */
    def getKeyValueList(x: Map[String, String],
                        timeField: String,
                        keyFieldsList: List[String],
                        valueFieldsList: List[String]): (List[String], Seq[String]) = {
        val key = getKey(x, timeField, keyFieldsList)
        var value = Seq[String]()
        for (i <- valueFieldsList.indices) {
            value :+= x.getOrElse(valueFieldsList(i), "")
        }
        (key, value)
    }


    /*
     * 获取去重统计值
     * (List(2020-10-12:09, 1, 1, 3, 1, 1),0, Set(150005, 150017, 150026, 150030))
     */
    def getSetStat(x: (List[String], Set[String]),
                   setKey: String): (List[String], Int, Set[String]) = {

        val redisKey = x._1.head + "_uid" + "_" + setKey
        val hashKey = x._1.tail.mkString("_")
        // key中添加渠道信息
        val uidSet: Set[String] = x._2.map(i => s"${hashKey}_${i}")
        //println(s"${uidSetSize}")
        (List(redisKey, hashKey), 0, uidSet)
    }

    /*
     * 计算计数统计
     * (List(2020-10-12:09, 1, 1, 3, 1, 1),12, Set())
     */
    def getCountStat(x: (List[String], Int),
                     setKey: String): (List[String], Int, Set[String]) = {
        val redisKey = x._1.head + "_" + setKey
        val hashKey = x._1.tail.mkString("_")
        (List(redisKey, hashKey), x._2, Set[String]())
    }


    // json转为List
    def jsonArr2List(jsonArray: JSONArray): List[String] = {
        var list = List[String]()
        for (i <- jsonArray.asScala) {
            list :+= i.toString
        }
        list
    }

    // json转map
    def jsonObj2Map(jsonObj: JSONObject): Map[String, String] = {
        val map: mutable.Map[String, String] = mutable.Map[String, String]()
        if (jsonObj != null) {
            val jsonKey = jsonObj.keySet()
            val iter = jsonKey.iterator()
            while (iter.hasNext) {
                val instance = iter.next()
                val value = if (jsonObj.get(instance) == null) "" else jsonObj.get(instance).toString
                map.put(instance, value)
            }
        }
        map.toMap
    }

    // json转map
    def jsonObj2MapInt(jsonObj: JSONObject): Map[String, Int] = {
        val map: mutable.Map[String, Int] = mutable.Map[String, Int]()
        if (jsonObj != null) {
            val jsonKey = jsonObj.keySet()
            val iter = jsonKey.iterator()
            while (iter.hasNext) {
                val instance = iter.next()
                val value = if (jsonObj.get(instance) == null) 0 else jsonObj.get(instance).toString.toInt
                map.put(instance, value)
            }
        }
        map.toMap
    }


    def int2tag(value: Int): String = {
        if (value > 10)
            return "high"
        else if (value > 3)
            return "middle"
        else return "low"
    }

    def double2tag(value: Double): String = {
        if (value > 10)
            return "high"
        else if (value > 3)
            return "middle"
        else return "low"
    }

    def mapMax2tag(value: Map[String, Int]): String = {
        value.maxBy(_._2)._1
    }


    // 默认模型使用的tag函数
    def defaultTagFunc(value: String): String = {
        // ["ap",{"other":5,"ad":5,"ap":6}]
        // nick_name: 聊不到
        var res = value
        try {
            val arr = JSON.parseArray(value)
            res = arr.get(0).toString
        } catch {
            case e: Exception => ""
        }
        if (res == "") "NULL" else res
    }

    // 默认模型使用的tag函数
    def defaultTagDetailFunc(value: String): String = {
        // ["ap",{"other":5,"ad":5,"ap":6}]
        // nick_name: 聊不到
        try {
            val arr: JSONArray = JSON.parseArray(value)
            arr.get(1).toString
        } catch {
            case e: Exception => ""
        }
    }

    // 默认模型使用的tag函数
    def defaultTagAllFunc(value: String): Map[String, Int] = {
        // ["ap",{"other":5,"ad":5,"ap":6}]
        // nick_name: 聊不到
        try {
            val arr: JSONArray = JSON.parseArray(value)
            Map(arr.get(0).toString -> arr.get(1).toString.toInt)
        } catch {
            case e: Exception => Map[String, Int]()
        }
    }

    // 模型输入数据的过滤函数 (0代表未实名)
    def realNameFilter(x: (List[String], mutable.Map[String, String]),
                       filterColumn: String = "is_realname",
                       train: Boolean = false): Boolean = {
        val isRealName = x._2.getOrElse(filterColumn, "NULL")
        val result: Boolean = {
            if (train) isRealName != "NULL" && isRealName != "0"
            else isRealName == "NULL" || isRealName == "0"
        }
        result
    }

    def labelList2IndexMap(labelList: List[List[String]]): List[mutable.Map[String, String]] = {
        var labelIndexMapList = List[mutable.Map[String, String]]()
        for (labels <- labelList) {
            val labelMap = mutable.Map[String, String]()
            for (i <- labels.indices) {
                labelMap.put(labels(i), i.toString)
            }
            labelIndexMapList :+= labelMap
        }
        labelIndexMapList
    }
}
